Pedro Machado
08/05/2020, 4:58 AMTraceback (most recent call last):
File "/home/pedro/prefect/my_flow.py", line 187, in <module>
flow.register(project_name="myproject", labels=["aws"])
File "/home/pedro/.venvs/prefect/lib/python3.7/site-packages/prefect/core/flow.py", line 1581, in register
no_url=no_url,
File "/home/pedro/.venvs/prefect/lib/python3.7/site-packages/prefect/client/client.py", line 668, in register
project = self.graphql(query_project).data.project # type: ignore
File "/home/pedro/.venvs/prefect/lib/python3.7/site-packages/prefect/client/client.py", line 238, in graphql
raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'path': ['project'], 'message': 'field "project" not found in type: \'query_root\'', 'extensions': {'path': '$.selectionSet.project', 'code': 'validation-failed', 'exception': {'message': 'field "project" not found in type: \'query_root\''}}}]
Any ideas?Hawkar Mahmod
08/05/2020, 11:55 AMLukas N.
08/05/2020, 1:17 PM<http://vault.security.banzaicloud.io/vault-addr|vault.security.banzaicloud.io/vault-addr>:
<http://vault.security.banzaicloud.io/vault-role|vault.security.banzaicloud.io/vault-role>:
Looking at the code of the agent the job_spec
is hardwired and I cannot modify it. I've also checked the KubernetesJobEnvironment
which seems like a way to go for a custom job_spec.yaml
file. But in this case, the environment values specified in the prefect Kubernetes agent prefect agent start kubernetes --env NAME=value
don't get passed to the custom job. They only get passed to the first Job that creates the custom job. Is there another way to have both custom annotations on Jobs and environment values passed from prefect kubernetes agent?Adam
08/05/2020, 1:23 PMAdam
08/05/2020, 3:25 PMPostgresExecute
task which runs an insert statement. In addition to the thousands of INSERTs, the task also creates and terminates a postgres connection each time which i’d prefer to avoid. Thoughts? My current implementation in the threadRiley Hun
08/05/2020, 4:48 PMSlackbot
08/05/2020, 5:00 PMAdam
08/05/2020, 7:49 PMcloudpickle_deserialization_check
If I stop calling that function it works again. Am I missing something? Can I not call other methods from within a task?George Coyne
08/05/2020, 8:53 PMGeorge Coyne
08/05/2020, 8:56 PMAmit
08/06/2020, 1:19 PM~/.prefect/config.toml
file, is there any other way like environment variable or something?Adam
08/06/2020, 4:10 PMMing Fang
08/06/2020, 8:00 PMMark McDonald
08/06/2020, 8:14 PMDarragh
08/06/2020, 8:36 PMprefect backend server
&& `prefect server start`:
export PREFECT__CLOUD__API=http://$(curl <http://169.254.169.254/latest/meta-data/public-ipv4):4200/graphql>
export PREFECT__SERVER__UI__GRAPHQL_URL=http://$(curl <http://169.254.169.254/latest/meta-data/public-ipv4):4200/graphql>
where the above curls evaluate to the public IP of the EC2 instance, and then previously the various containers would be able to talk to the graphql container, but it’s not working since the update to 0.13.0.
Instead, in the UI I now get the following:
Couldn't connect
<http://localhost:4200/graphql>
Any suggestions? Different env vars needed, something else?Skip Breidbach
08/06/2020, 10:02 PMflow.register()
in 0.13? I'm getting 'project_name' is a required field when registering a flow.
I've verified that my backend is set to server
and have debugged into the code & don't see how project_name
can be optional at this point. I even tried calling Dunder Mifflin but they didn't answer.Skip Breidbach
08/06/2020, 11:47 PM--show-flow-logs
and try to run the registered flow via the UI. Agent picks up the job fine but reports:
[2020-08-06 23:43:16,058] ERROR - agent | Error while deploying flow: AttributeError("Can't pickle local object 'NpipeHTTPAdapter.__init__.<locals>.<lambda>'")
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Python38\lib\multiprocessing\spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
File "C:\Python38\lib\multiprocessing\spawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
EOFError: Ran out of input
Marwan Sarieddine
08/07/2020, 3:20 AMMatthias
08/07/2020, 11:32 AMbolto6
08/07/2020, 12:32 PMb_flow
from a_flow
and dir parameter?
Flow A:
python
@task
def any_work(a: str) -> str:
return f'/tmp/{a}'
with Flow('a_flow') as flow:
name = Parameter('name', default='all')
result = any_work(name)
Flow B:
python
@task
def any_work(a: str, b: str) -> str:
return f'{a}/{b}'
with Flow('b_flow') as flow:
home = Parameter('home', required=True)
dir = Parameter('dir', default='any')
result = any_work(home, dir)
Flow C:
python
a_flow = FlowRunTask(flow_name='a_flow', wait=True)
b_flow = FlowRunTask(flow_name='b_flow', wait=True)
with Flow('c_flow') as flow:
name = Parameter('name', default='all')
dir = Parameter('dir', default='any')
a_flow_state = a_flow(parameters={'name': name})
# this error code, but how i can add parameters in `b_flow` from `a_flow` and dir parameter?
result = b_flow(
upstream_tasks=[a_flow_state],
parameters={
'home': a_flow_state.result,
'dir': dir,
},
)
Victor Apolonio
08/07/2020, 1:50 PMDarragh
08/07/2020, 1:52 PMBen Fu
08/07/2020, 2:52 PMalex
08/07/2020, 3:38 PMRiley Hun
08/07/2020, 4:01 PMset_upstream
and set_downstream
work?
Here is an ETL flow I created:
with Flow("Thinknum ETL") as flow:
token = token_task(
version=version,
client_id=client_id,
client_secret=client_secret
)
history = history_task(
dataset_id=dataset_id,
version=version,
token=token
)
loaded_dates = loaded_dates_task(
bucket_name=bucket_name,
dataset_id=dataset_id
)
dates_to_load = get_dates_to_load(
already_loaded_dates=loaded_dates,
history=history
)
datasets_to_load = load_to_gcs_task.map(
dataset_id=unmapped(dataset_id),
date=dates_to_load,
version=unmapped(version),
token=unmapped(token),
bucket_name=unmapped(bucket_name)
)
It DOES seem to work fine, but I don't know when or if I should be applying `set_upstream`/`set_downstream`Riley Hun
08/07/2020, 6:36 PMModuleNotFoundError
. Do I need to dockerize the flow and specify python paths for this to work?
Thanks in advance!
My folder tree looks something like this
├── alternative_data_pipelines
│ ├── thinknum
|. ├── __init__.py
│ │ └── thinknum.py
│ └── utils
│ ├── __init__.py
│ ├── logging.py
│ ├── snowflake.py
│ └── utils.py
├── requirements.txt
├── setup.py
└── thinknum_flow.py
alex
08/07/2020, 9:17 PMfeeds = get_feeds()
with Flow(..):
for feed in feeds:
run_feed = GetDataTask(name=f"GetData: {feed.feed_name}")(feed)
latest = run_feed
if feed.do_transform():
transform_feed = TransformTask()(feed, upstream_tasks=[latest]))
latest = transform_feed
# ... some more optional logic here ...
failure_recovery = FailureRecoveryTask(trigger=any_failed)(feed, upstream_tasks=[latest]) # should handle any failure in any of above tasks
mapped = feeds.map(
all_feeds,
target=unmapped("aggregated"),
)
mapped.set_upstream([failure_recovery])
This structure isn't giving me the dag i'm looking for and I was wondering if anyone could give any advice on the most "prefect" way to do this. Some questions I had:
• Should I initialize tasks before the flow, as the docs advise or is this structure ok?
• Is the if and latest=
logic advisable? Or should I use run and skip option transformations and set a "skipped" state?
• How should I specify the aggregation task? Right now, the map task seems to only have a dependency on the last feed's failure_recovery_task
.Riley Hun
08/08/2020, 12:51 AM# parent class
class ABC:
def __init__(
self,
user,
password
)
self.user = user
self.password = password
def query(self)
pass
I want to do something like this
# task
class Task_A(Task, ABC):
def run()
pass
Currently, I'm just doing this instead
@task
def task_a(user, password, date_col, dataset_id):
user: str = None,
password: str = None,
date_col: str = None,
dataset_id: str = None
):
conn = ABC(user=user, password=password)
query = f"SELECT DISTINCT {date_col} FROM EDW_DNA_DB.WI.{dataset_id}"
query_result = conn.query(query)
return query_result[date_col].tolist()
bral
08/08/2020, 9:21 AM